// Copyright 2025 XTX Markets Technologies Limited
//
// SPDX-License-Identifier: GPL-2.0-or-later

#include "CDCDB.hpp"

#include <algorithm>
#include <cstdint>
#include <limits>
#include <memory>
#include <rocksdb/iterator.h>
#include <rocksdb/options.h>
#include <rocksdb/statistics.h>
#include <rocksdb/status.h>
#include <rocksdb/utilities/transaction.h>
#include <rocksdb/utilities/optimistic_transaction_db.h>
#include <unordered_set>


#include "Assert.hpp"
#include "AssertiveLock.hpp"
#include "CDCDBData.hpp"
#include "Common.hpp"
#include "Env.hpp"
#include "Exception.hpp"
#include "MsgsGen.hpp"
#include "RocksDBUtils.hpp"
#include "ShardDB.hpp"
#include "Time.hpp"
#include "XmonAgent.hpp"

// The CDC needs to remember about multi-step transactions which it executes by
// talking to the shards. So essentially we need to store a bunch of queued
// queued requests, and a bunch of currently executing transactions, which are
// waiting for shard responses.
//
// Generally speaking we need to make sure to not have transactions step on each
// others toes. This is especially pertinent for edge locking logic -- given our
// requirements for idempotency (we don't know when a request has gone through)
// when we lock an edge we're really locking it so that shard-specific operations
// don't mess things up, rather than other CDC operations.
//
// The strategy that we adopt to achieve that is as follows:
//
// 1. Each request that comes in gets assigned a txn id. The txn ids are sequential:
//     later request, higher txn ids.
// 2. We specify a set of directory inode ids that a given request will need to lock
//     edges on.
// 3. We store a map InodeId -> []TxnId. The list of TxnIds is ordered. Each TxnId
//     is in the list value for all the InodeId keys it needs locks for. Only TxnIds
//     that have not completed yet are in this map.
// 4. A txn does not start executing unless it is at the head of all the lists in the
//     map above.
//
// This scheme ensures no deadlocks and also fairness (no transaction can ever be
// starved). Divide the requests into the equivalence classes generated by the
// relation "have directories to be locked in common". The scheme above guarantees
// that each class will be run serially, with transactions with earlier txn ids
// being run before txns with later txn ids.
//
// The "move directory" request is a bit of an exception. It needs to do a "no loops"
// check which depends on the entire directory tree. So for simplicity we arrange
// for each move directory request to run serially, by creating a dummy InodeId
// which represents the "move directory lock", so to speak.
//
// The first version of the CDC just executed each transaction serially, but this
// proved way too slow, capping directory creation at around 200req/s. So we did the
// above to pipeline CDC requests. Since this required a change to the CDC RocksDB
// schema, we call the first version of the schema V0, the current one V1.


std::vector<rocksdb::ColumnFamilyDescriptor> CDCDB::getColumnFamilyDescriptors() {
    return {
        {rocksdb::kDefaultColumnFamilyName, {}},
        {"reqQueue", {}},
        {"parent", {}},
        {"enqueued", {}},
        {"executing", {}},
        {"dirsToTxns", {}},
    };
}

std::ostream& operator<<(std::ostream& out, const CDCShardReq& x) {
    out << "CDCShardReq(shid=" << x.shid << ", req=" << x.req << ")";
    return out;
}

std::ostream& operator<<(std::ostream& out, const CDCStep& x) {
    out << "CDCStep(finishedTxns=[";
    for (int i = 0; i < x.finishedTxns.size(); i++) {
        if (i > 0) {
            out << ", ";
        }
        const auto& txn = x.finishedTxns[i];
        out << "<" << txn.first << ", " << txn.second << ">";
    }
    out << "], runningTxns=[";
    for (int i = 0; i < x.runningTxns.size(); i++) {
        if (i > 0) {
            out << ", ";
        }
        const auto& txn = x.runningTxns[i];
        out << "<" << txn.first << ", " << txn.second << ">";
    }
    out << "])";
    return out;
}

std::ostream& operator<<(std::ostream& out, CDCTxnId id) {
    return out << id.x;
}

std::ostream& operator<<(std::ostream& out, const CDCShardResp& x) {
    return out << "CDCShardResp(txnId=" << x.txnId << ", resp=" << x.resp << ")";
}

std::ostream& operator<<(std::ostream& out, const CDCLogEntry& x) {
    out << "CDCLogEntry(logIdx= " << x.logIdx() << ", ";
    out << "cdcReqs=[";
    for (auto& req : x.cdcReqs()) {
        out << req << ", ";
    }
    out << "], shardResps=[";
    for (auto& resp: x.shardResps()) {
        out << resp << ", ";
    }
    out << "])";
    return out;
}

inline bool createCurrentLockedEdgeRetry(TernError err) {
    return
        err == TernError::TIMEOUT || err == TernError::MTIME_IS_TOO_RECENT ||
        err == TernError::MORE_RECENT_SNAPSHOT_EDGE || err == TernError::MORE_RECENT_CURRENT_EDGE;
}

static constexpr InodeId MOVE_DIRECTORY_LOCK = InodeId::FromU64Unchecked(1ull<<63);

struct DirectoriesNeedingLock {
private:
    static constexpr int MAX_SIZE = 3;
    std::array<InodeId, MAX_SIZE> _ids;
    int _size;

public:
    DirectoriesNeedingLock() : _size(0) {
        memset(_ids.data(), 0, _ids.size()*sizeof(decltype(_ids)::value_type));
    }

    int size() const {
        return _size;
    }

    decltype(_ids)::const_iterator begin() const {
        return _ids.begin();
    }

    decltype(_ids)::const_iterator end() const {
        return _ids.begin() + _size;
    }

    void add(InodeId id) {
        ALWAYS_ASSERT(_size != MAX_SIZE);
        ALWAYS_ASSERT(id != InodeId::FromU64Unchecked(0));
        for (InodeId otherId : _ids) {
            if (otherId == id) { return; }
        }
        _ids[_size] = id;
        _size++;
    }
};

// These are all the directories where we'll lock edges given a request.
// These function _must be pure_! We call it repeatedly as if it's a property
// of the request more than a function.
//
// Technically every well formed request will have distinct inode ids, but there
// are parts in the code where this function is called before we know that the
// request is valid. Hence the set semantics of DirectoriesNeedingLock.
static DirectoriesNeedingLock directoriesNeedingLock(const CDCReqContainer& req) {
    DirectoriesNeedingLock toLock;
    switch (req.kind()) {
    case CDCMessageKind::MAKE_DIRECTORY:
        toLock.add(req.getMakeDirectory().ownerId);
        break;
    case CDCMessageKind::RENAME_FILE:
        toLock.add(req.getRenameFile().oldOwnerId);
        toLock.add(req.getRenameFile().newOwnerId);
        break;
    case CDCMessageKind::SOFT_UNLINK_DIRECTORY:
        // Lock needs to be acquired on both owner and target directory.
        // Lock on owner is needed for remove owner step of the state machine.
        // Lock on target is needed so that the target does not get removed by GC after
        // we unlink the owner but before we unlock the edge.
        toLock.add(req.getSoftUnlinkDirectory().ownerId);
        toLock.add(req.getSoftUnlinkDirectory().targetId);
        break;
    case CDCMessageKind::RENAME_DIRECTORY:
        toLock.add(req.getRenameDirectory().oldOwnerId);
        toLock.add(req.getRenameDirectory().newOwnerId);
        // Moving directories is special: it can introduce loops if we're not careful.
        // Instead of trying to not create loops in the context of interleaved transactions,
        // we instead only allow one move directory at a time.
        toLock.add(MOVE_DIRECTORY_LOCK);
        break;
    case CDCMessageKind::HARD_UNLINK_DIRECTORY:
        toLock.add(req.getHardUnlinkDirectory().dirId);
        break;
    case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE:
        toLock.add(req.getCrossShardHardUnlinkFile().ownerId);
        break;
    case CDCMessageKind::ERROR:
        throw TERN_EXCEPTION("bad req type error");
    default:
        throw TERN_EXCEPTION("bad req type %s", (uint8_t)req.kind());
    }
    return toLock;
}

struct StateMachineEnv {
    Env& env;
    rocksdb::ColumnFamilyHandle* defaultCf;
    rocksdb::ColumnFamilyHandle* parentCf;
    rocksdb::Transaction& dbTxn;
    CDCTxnId txnId;
    uint8_t txnStep;
    CDCStep& cdcStep;
    bool finished;

    StateMachineEnv(
        Env& env_, rocksdb::ColumnFamilyHandle* defaultCf_, rocksdb::ColumnFamilyHandle* parentCf_, rocksdb::Transaction& dbTxn_, CDCTxnId txnId_, uint8_t step_, CDCStep& cdcStep_
    ):
        env(env_), defaultCf(defaultCf_), parentCf(parentCf_), dbTxn(dbTxn_), txnId(txnId_), txnStep(step_), cdcStep(cdcStep_), finished(false)
    {}

    InodeId nextDirectoryId(rocksdb::Transaction& dbTxn) {
        std::string v;
        ROCKS_DB_CHECKED(dbTxn.Get({}, defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), &v));
        ExternalValue<InodeIdValue> nextId(v);
        InodeId id = nextId().id();
        nextId().setId(InodeId::FromU64(id.u64 + 1));
        ROCKS_DB_CHECKED(dbTxn.Put(defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), nextId.toSlice()));
        return id;
    }

    ShardReqContainer& needsShard(uint8_t step, ShardId shid, bool repeated) {
        txnStep = step;
        auto& running = cdcStep.runningTxns.emplace_back();
        running.first = txnId;
        running.second.shid = shid;
        running.second.repeated = repeated;
        return running.second.req;
    }

    CDCRespContainer& finish() {
        this->finished = true;
        auto& finished = cdcStep.finishedTxns.emplace_back();
        finished.first = txnId;
        return finished.second;
    }

    void finishWithError(TernError err) {
        this->finished = true;
        ALWAYS_ASSERT(err != TernError::NO_ERROR);
        auto& errored = cdcStep.finishedTxns.emplace_back();
        errored.first = txnId;
        errored.second.setError() = err;
    }
};

constexpr uint8_t TXN_START = 0;

enum MakeDirectoryStep : uint8_t {
    MAKE_DIRECTORY_LOOKUP = 1,
    MAKE_DIRECTORY_CREATE_DIR = 2,
    MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME = 3,
    MAKE_DIRECTORY_CREATE_LOCKED_EDGE = 4,
    MAKE_DIRECTORY_UNLOCK_EDGE = 5,
    MAKE_DIRECTORY_ROLLBACK = 6,
};

// Steps:
//
// 1. Lookup if an existing directory exists. If it does, immediately succeed.
// 2. Allocate inode id here in the CDC
// 3. Create directory in shard we get from the inode
// 4. Lookup old creation time for the edge we're about to create
// 5. Create locked edge from owner to newly created directory. If this fail because of bad creation time, go back to 4
// 6. Unlock the edge created in 3
//
// If 4 or 5 fails, 3 must be rolled back. 6 does not fail.
//
// 1 is necessary rather than failing on attempted override because otherwise failures
// due to repeated calls are indistinguishable from genuine failures.
struct MakeDirectoryStateMachine {
    StateMachineEnv& env;
    const MakeDirectoryReq& req;
    MakeDirectoryState state;

    MakeDirectoryStateMachine(StateMachineEnv& env_, const MakeDirectoryReq& req_, MakeDirectoryState state_):
        env(env_), req(req_), state(state_)
    {}

    void resume(const ShardRespContainer* resp) {
        if (env.txnStep == TXN_START) {
            start();
            return;
        }
        if (unlikely(resp == nullptr)) { // we're resuming with no response
            switch (env.txnStep) {
                case MAKE_DIRECTORY_LOOKUP: lookup(); break;
                case MAKE_DIRECTORY_CREATE_DIR: createDirectoryInode(); break;
                case MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME: lookupOldCreationTime(); break;
                case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: createLockedEdge(); break;
                case MAKE_DIRECTORY_UNLOCK_EDGE: unlockEdge(); break;
                case MAKE_DIRECTORY_ROLLBACK: rollback(); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        } else {
            switch (env.txnStep) {
                case MAKE_DIRECTORY_LOOKUP: afterLookup(*resp); break;
                case MAKE_DIRECTORY_CREATE_DIR: afterCreateDirectoryInode(*resp); break;
                case MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break;
                case MAKE_DIRECTORY_CREATE_LOCKED_EDGE: afterCreateLockedEdge(*resp); break;
                case MAKE_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(*resp); break;
                case MAKE_DIRECTORY_ROLLBACK: afterRollback(*resp); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        }
    }

    void start() {
        state.setDirId(env.nextDirectoryId(env.dbTxn));
        lookup();
    }

    void lookup(bool repeated = false) {
        auto& shardReq = env.needsShard(MAKE_DIRECTORY_LOOKUP, req.ownerId.shard(), repeated).setLookup();
        shardReq.dirId = req.ownerId;
        shardReq.name = req.name;
    }

    void afterLookup(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            lookup(true); // retry
        } else if (err == TernError::DIRECTORY_NOT_FOUND) {
            env.finishWithError(err);
        } else if (err == TernError::NAME_NOT_FOUND) {
            // normal case, let's proceed
            createDirectoryInode();
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            const auto& lookupResp = resp.getLookup();
            if (lookupResp.targetId.type() == InodeType::DIRECTORY) {
                // we're good already
                auto& cdcResp = env.finish().setMakeDirectory();
                cdcResp.creationTime = lookupResp.creationTime;
                cdcResp.id = lookupResp.targetId;
            } else {
                env.finishWithError(TernError::CANNOT_OVERRIDE_NAME);
            }
        }
    }

    void createDirectoryInode(bool repeated = false) {
        auto& shardReq = env.needsShard(MAKE_DIRECTORY_CREATE_DIR, state.dirId().shard(), repeated).setCreateDirectoryInode();
        shardReq.id = state.dirId();
        shardReq.ownerId = req.ownerId;
    }

    void afterCreateDirectoryInode(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            // Try again -- note that the call to create directory inode is idempotent.
            createDirectoryInode(true);
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            lookupOldCreationTime();
        }
    }

    void lookupOldCreationTime(bool repeated = false) {
        auto& shardReq = env.needsShard(MAKE_DIRECTORY_LOOKUP_OLD_CREATION_TIME, req.ownerId.shard(), repeated).setFullReadDir();
        shardReq.dirId = req.ownerId;
        shardReq.flags = FULL_READ_DIR_BACKWARDS | FULL_READ_DIR_SAME_NAME | FULL_READ_DIR_CURRENT;
        shardReq.limit = 1;
        shardReq.startName = req.name;
        shardReq.startTime = 0; // we have current set
    }

    void afterLookupOldCreationTime(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            lookupOldCreationTime(true); // retry
        } else if (err == TernError::DIRECTORY_NOT_FOUND) {
            // the directory we looked into doesn't even exist anymore --
            // we've failed hard and we need to remove the inode.
            state.setExitError(err);
            rollback();
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            // there might be no existing edge
            const auto& fullReadDir = resp.getFullReadDir();
            ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have limit=1
            if (fullReadDir.results.els.size() == 0) {
                state.setOldCreationTime(0); // there was nothing present for this name
            } else {
                state.setOldCreationTime(fullReadDir.results.els[0].creationTime);
            }
            // keep going
            createLockedEdge();
        }
    }

    void createLockedEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(MAKE_DIRECTORY_CREATE_LOCKED_EDGE, req.ownerId.shard(), repeated).setCreateLockedCurrentEdge();
        shardReq.dirId = req.ownerId;
        shardReq.targetId = state.dirId();
        shardReq.name = req.name;
        shardReq.oldCreationTime = state.oldCreationTime();
    }

    void afterCreateLockedEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (createCurrentLockedEdgeRetry(err)) {
            createLockedEdge(true); // try again
        } else if (err == TernError::CANNOT_OVERRIDE_NAME) {
            // this happens when a file gets created between when we looked
            // up whether there was something else and now.
            state.setExitError(err);
            rollback();
        } else if (err == TernError::MISMATCHING_CREATION_TIME) {
            // lookup the old creation time again
            lookupOldCreationTime();
        } else {
            // We know we cannot get directory not found because we managed to lookup
            // old creation time.
            //
            // We also cannot get MISMATCHING_TARGET since we are the only one
            // creating locked edges, and transactions execute serially.
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            state.setCreationTime(resp.getCreateLockedCurrentEdge().creationTime);
            unlockEdge();
        }
    }

    void unlockEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(MAKE_DIRECTORY_UNLOCK_EDGE, req.ownerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.ownerId;
        shardReq.name = req.name;
        shardReq.targetId = state.dirId();
        shardReq.wasMoved = false;
        shardReq.creationTime = state.creationTime();
    }

    void afterUnlockEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT || err == TernError::MTIME_IS_TOO_RECENT) {
            // retry
            unlockEdge(true);
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            // We're done, record the parent relationship and finish
            {
                auto k = InodeIdKey::Static(state.dirId());
                auto v = InodeIdValue::Static(req.ownerId);
                ROCKS_DB_CHECKED(env.dbTxn.Put(env.parentCf, k.toSlice(), v.toSlice()));
            }
            auto& resp = env.finish().setMakeDirectory();
            resp.id = state.dirId();
            resp.creationTime = state.creationTime();
        }
    }

    void rollback(bool repeated = false) {
        // disown the child, it'll be collected by GC.
        auto& shardReq = env.needsShard(MAKE_DIRECTORY_ROLLBACK, state.dirId().shard(), repeated).setRemoveDirectoryOwner();
        shardReq.dirId = state.dirId();
        // we've just created this directory, it is empty, therefore the policy
        // is irrelevant.
        shardReq.info = defaultDirectoryInfo();
    }

    void afterRollback(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            rollback(true); // retry
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            env.finishWithError(state.exitError());
        }
    }
};

enum HardUnlinkDirectoryStep : uint8_t {
    HARD_UNLINK_DIRECTORY_REMOVE_INODE = 1,
};

// The only reason we have this here is for possible conflicts with RemoveDirectoryOwner,
// which might temporarily set the owner of a directory to NULL. Since in the current
// implementation we only ever have one transaction in flight in the CDC, we can just
// execute this.
struct HardUnlinkDirectoryStateMachine {
    StateMachineEnv& env;
    const HardUnlinkDirectoryReq& req;
    HardUnlinkDirectoryState state;

    HardUnlinkDirectoryStateMachine(StateMachineEnv& env_, const HardUnlinkDirectoryReq& req_, HardUnlinkDirectoryState state_):
        env(env_), req(req_), state(state_)
    {}

    void resume(const ShardRespContainer* resp) {
        if (env.txnStep == TXN_START) {
            removeInode();
            return;
        }
        if (unlikely(resp == nullptr)) { // we're resuming with no response
            switch (env.txnStep) {
                case HARD_UNLINK_DIRECTORY_REMOVE_INODE: removeInode(); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        } else {
            switch (env.txnStep) {
                case HARD_UNLINK_DIRECTORY_REMOVE_INODE: afterRemoveInode(*resp); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        }
    }

    void removeInode(bool repeated = false) {
        auto& shardReq = env.needsShard(HARD_UNLINK_DIRECTORY_REMOVE_INODE, req.dirId.shard(), repeated).setRemoveInode();
        shardReq.id = req.dirId;
    }

    void afterRemoveInode(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            removeInode(true); // try again
        } else if (
            err == TernError::DIRECTORY_NOT_FOUND || err == TernError::DIRECTORY_HAS_OWNER || err == TernError::DIRECTORY_NOT_EMPTY
        ) {
            env.finishWithError(err);
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            env.finish().setHardUnlinkDirectory();
        }
    }
};

enum RenameFileStep : uint8_t {
    RENAME_FILE_LOCK_OLD_EDGE = 1,
    RENAME_FILE_LOOKUP_OLD_CREATION_TIME = 2,
    RENAME_FILE_CREATE_NEW_LOCKED_EDGE = 3,
    RENAME_FILE_UNLOCK_NEW_EDGE = 4,
    RENAME_FILE_UNLOCK_OLD_EDGE = 5,
    RENAME_FILE_ROLLBACK = 6,
};

// Steps:
//
// 1. lock source current edge
// 2. lookup prev creation time for current target edge
// 3. create destination locked current target edge. if it fails because of bad creation time, go back to 2
// 4. unlock edge in step 3
// 5. unlock source target current edge, and soft unlink it
//
// If we fail at step 2 or 3, we need to roll back step 1. Steps 3 and 4 should never fail.
struct RenameFileStateMachine {
    StateMachineEnv& env;
    const RenameFileReq& req;
    RenameFileState state;

    RenameFileStateMachine(StateMachineEnv& env_, const RenameFileReq& req_, RenameFileState state_):
        env(env_), req(req_), state(state_)
    {}

    void resume(const ShardRespContainer* resp) {
        if (env.txnStep == TXN_START) {
            start();
            return;
        }
        if (unlikely(resp == nullptr)) { // we're resuming with no response
            switch (env.txnStep) {
                case RENAME_FILE_LOCK_OLD_EDGE: lockOldEdge(); break;
                case RENAME_FILE_LOOKUP_OLD_CREATION_TIME: lookupOldCreationTime(); break;
                case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: createNewLockedEdge(); break;
                case RENAME_FILE_UNLOCK_NEW_EDGE: unlockNewEdge(); break;
                case RENAME_FILE_UNLOCK_OLD_EDGE: unlockOldEdge(); break;
                case RENAME_FILE_ROLLBACK: rollback(); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        } else {
            switch (env.txnStep) {
                case RENAME_FILE_LOCK_OLD_EDGE: afterLockOldEdge(*resp); break;
                case RENAME_FILE_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break;
                case RENAME_FILE_CREATE_NEW_LOCKED_EDGE: afterCreateNewLockedEdge(*resp); break;
                case RENAME_FILE_UNLOCK_NEW_EDGE: afterUnlockNewEdge(*resp); break;
                case RENAME_FILE_UNLOCK_OLD_EDGE: afterUnlockOldEdge(*resp); break;
                case RENAME_FILE_ROLLBACK: afterRollback(*resp); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        }
    }

    void start() {
        // We need this explicit check here because moving directories is more complicated,
        // and therefore we do it in another transaction type entirely.
        if (req.targetId.type() == InodeType::DIRECTORY) {
            env.finishWithError(TernError::TYPE_IS_NOT_DIRECTORY);
        } else if (req.oldOwnerId == req.newOwnerId) {
            env.finishWithError(TernError::SAME_DIRECTORIES);
        } else {
            lockOldEdge();
        }
    }

    void lockOldEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_FILE_LOCK_OLD_EDGE, req.oldOwnerId.shard(), repeated).setLockCurrentEdge();
        shardReq.dirId = req.oldOwnerId;
        shardReq.name = req.oldName;
        shardReq.targetId = req.targetId;
        shardReq.creationTime = req.oldCreationTime;
    }

    void afterLockOldEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            lockOldEdge(true); // retry
        } else if (
            err == TernError::EDGE_NOT_FOUND || err == TernError::MISMATCHING_CREATION_TIME || err == TernError::DIRECTORY_NOT_FOUND
        ) {
            // We failed hard and we have nothing to roll back
            if (err == TernError::DIRECTORY_NOT_FOUND) {
                err = TernError::OLD_DIRECTORY_NOT_FOUND;
            }
            env.finishWithError(err);
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            lookupOldCreationTime();
        }
    }

    void lookupOldCreationTime(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_FILE_LOOKUP_OLD_CREATION_TIME, req.newOwnerId.shard(), repeated).setFullReadDir();
        shardReq.dirId = req.newOwnerId;
        shardReq.flags = FULL_READ_DIR_BACKWARDS | FULL_READ_DIR_SAME_NAME | FULL_READ_DIR_CURRENT;
        shardReq.limit = 1;
        shardReq.startName = req.newName;
        shardReq.startTime = 0; // we have current set
    }

    void afterLookupOldCreationTime(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            lookupOldCreationTime(true); // retry
        } else if (err == TernError::DIRECTORY_NOT_FOUND) {
            // we've failed hard and we need to unlock the old edge.
            err = TernError::NEW_DIRECTORY_NOT_FOUND;
            state.setExitError(err);
            rollback();
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            // there might be no existing edge
            const auto& fullReadDir = resp.getFullReadDir();
            ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have limit=1
            if (fullReadDir.results.els.size() == 0) {
                state.setNewOldCreationTime(0); // there was nothing present for this name
            } else {
                state.setNewOldCreationTime(fullReadDir.results.els[0].creationTime);
            }
            // keep going
            createNewLockedEdge();
        }
    }

    void createNewLockedEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_FILE_CREATE_NEW_LOCKED_EDGE, req.newOwnerId.shard(), repeated).setCreateLockedCurrentEdge();
        shardReq.dirId = req.newOwnerId;
        shardReq.name = req.newName;
        shardReq.targetId = req.targetId;
        shardReq.oldCreationTime = state.newOldCreationTime();
    }

    void afterCreateNewLockedEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (createCurrentLockedEdgeRetry(err)) {
            createNewLockedEdge(true); // retry
        } else if (err == TernError::MISMATCHING_CREATION_TIME) {
            // we need to lookup the creation time again.
            lookupOldCreationTime();
        } else if (err == TernError::CANNOT_OVERRIDE_NAME) {
            // we failed hard and we need to rollback
            state.setExitError(err);
            rollback();
        } else {
            state.setNewCreationTime(resp.getCreateLockedCurrentEdge().creationTime);
            unlockNewEdge();
        }
    }

    void unlockNewEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_FILE_UNLOCK_NEW_EDGE, req.newOwnerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.newOwnerId;
        shardReq.targetId = req.targetId;
        shardReq.name = req.newName;
        shardReq.wasMoved = false;
        shardReq.creationTime = state.newCreationTime();
    }

    void afterUnlockNewEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            unlockNewEdge(true); // retry
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            unlockOldEdge();
        }
    }

    void unlockOldEdge(bool repeated = false) {
        // We're done creating the destination edge, now unlock the source, marking it as moved
        auto& shardReq = env.needsShard(RENAME_FILE_UNLOCK_OLD_EDGE, req.oldOwnerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.oldOwnerId;
        shardReq.targetId = req.targetId;
        shardReq.name = req.oldName;
        shardReq.wasMoved = true;
        shardReq.creationTime = req.oldCreationTime;
    }

    void afterUnlockOldEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            unlockOldEdge(true); // retry
        } else {
            // This can only be because of repeated calls from here: we have the edge locked,
            // and only the CDC does changes.
            // TODO it would be cleaner to verify this with a lookup
            ALWAYS_ASSERT(err == TernError::NO_ERROR || err == TernError::EDGE_NOT_FOUND);
            // we're finally done
            auto& resp = env.finish().setRenameFile();
            resp.creationTime = state.newCreationTime();
        }
    }

    void rollback(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_FILE_ROLLBACK, req.oldOwnerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.oldOwnerId;
        shardReq.name = req.oldName;
        shardReq.targetId = req.targetId;
        shardReq.wasMoved = false;
        shardReq.creationTime = state.newCreationTime();
    }

    void afterRollback(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            rollback(true); // retry
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            env.finishWithError(state.exitError());
        }
    }
};

enum SoftUnlinkDirectoryStep : uint8_t {
    SOFT_UNLINK_DIRECTORY_LOCK_EDGE = 1,
    SOFT_UNLINK_DIRECTORY_STAT = 2,
    SOFT_UNLINK_DIRECTORY_REMOVE_OWNER = 3,
    SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE = 4,
    SOFT_UNLINK_DIRECTORY_ROLLBACK = 5,
};

// Steps:
//
// 1. Lock edge going into the directory to remove. This prevents things making
//     making it snapshot or similar in the meantime.
// 2. Resolve the directory info, since we'll need to store it when we remove the directory owner.
// 3. Remove directory owner from directory that we want to remove. This will fail if there still
//     are current edges there.
// 4. Unlock edge going into the directory, making it snapshot.
//
// If 2 or 3 fail, we need to roll back the locking, without making the edge snapshot.
struct SoftUnlinkDirectoryStateMachine {
    StateMachineEnv& env;
    const SoftUnlinkDirectoryReq& req;
    SoftUnlinkDirectoryState state;
    DirectoryInfo info;

    SoftUnlinkDirectoryStateMachine(StateMachineEnv& env_, const SoftUnlinkDirectoryReq& req_, SoftUnlinkDirectoryState state_):
        env(env_), req(req_), state(state_)
    {}

    void resume(const ShardRespContainer* resp) {
        if (env.txnStep == TXN_START) {
            start();
            return;
        }
        if (unlikely(resp == nullptr)) { // we're resuming with no response
            switch (env.txnStep) {
                case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: lockEdge(); break;
                case SOFT_UNLINK_DIRECTORY_STAT: stat(); break;
                // We don't persist the directory info, so we need to restart the stating when resuming
                case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: stat(); break;
                case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: unlockEdge(); break;
                case SOFT_UNLINK_DIRECTORY_ROLLBACK: rollback(); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        } else {
            switch (env.txnStep) {
                case SOFT_UNLINK_DIRECTORY_LOCK_EDGE: afterLockEdge(*resp); break;
                case SOFT_UNLINK_DIRECTORY_STAT: afterStat(*resp); break;
                case SOFT_UNLINK_DIRECTORY_REMOVE_OWNER: afterRemoveOwner(*resp); break;
                case SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE: afterUnlockEdge(*resp); break;
                case SOFT_UNLINK_DIRECTORY_ROLLBACK: afterRollback(*resp); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        }
    }

    void start() {
        if (req.targetId.type() != InodeType::DIRECTORY) {
            env.finishWithError(TernError::TYPE_IS_NOT_DIRECTORY);
        } else {
            lockEdge();
        }
    }

    void lockEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_LOCK_EDGE, req.ownerId.shard(), repeated).setLockCurrentEdge();
        shardReq.dirId = req.ownerId;
        shardReq.name = req.name;
        shardReq.targetId = req.targetId;
        shardReq.creationTime = req.creationTime;
    }

    void afterLockEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            lockEdge(true);
        } else if (err == TernError::MISMATCHING_CREATION_TIME || err == TernError::EDGE_NOT_FOUND || err == TernError::DIRECTORY_NOT_FOUND) {
            LOG_INFO(env.env, "failed locking edge in soft unlink for req: %s with err: %s", req, err);
            env.finishWithError(err); // no rollback to be done
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            state.setStatDirId(req.targetId);
            stat();
        }
    }

    void stat(bool repeated = false) {
        auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_STAT, state.statDirId().shard(), repeated).setStatDirectory();
        shardReq.id = state.statDirId();
    }

    void afterStat(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            stat(true); // retry
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            const auto& statResp = resp.getStatDirectory();
            // insert tags
            for (const auto& newEntry : statResp.info.entries.els) {
                if (std::find(REQUIRED_DIR_INFO_TAGS.begin(), REQUIRED_DIR_INFO_TAGS.end(), newEntry.tag) == REQUIRED_DIR_INFO_TAGS.end() ||
                    std::find(info.entries.els.begin(), info.entries.els.end(), newEntry) != info.entries.els.end()) {
                    // skip non required tags or already present tags
                    continue;
                }
                info.entries.els.emplace_back(newEntry);
            }
            if (info.entries.els.size() == REQUIRED_DIR_INFO_TAGS.size()) { // we've found everything
                auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_REMOVE_OWNER, req.targetId.shard(), false).setRemoveDirectoryOwner();
                shardReq.dirId = req.targetId;
                shardReq.info = info;
            } else {
                ALWAYS_ASSERT(statResp.owner != NULL_INODE_ID);
                // keep walking upwards
                state.setStatDirId(statResp.owner);
                stat();
            }
        }
    }

    void afterRemoveOwner(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            // we don't want to keep the dir info around start again from the last stat
            stat();
        } else if (err == TernError::DIRECTORY_NOT_EMPTY) {
            state.setExitError(err);
            rollback();
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR, "Unexpected error when removing owner, ownerId=%s name=%s creationTime=%s targetId=%s: %s", req.ownerId, GoLangQuotedStringFmt(req.name.ref().data(), req.name.ref().size()), req.creationTime, req.targetId, err);
            unlockEdge();
        }
    }

    void unlockEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_UNLOCK_EDGE, req.ownerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.ownerId;
        shardReq.name = req.name;
        shardReq.targetId = req.targetId;
        // Note that here we used wasMoved even if the subsequent
        // snapshot edge will be non-owned, since we're dealing with
        // a directory, rather than a file.
        shardReq.wasMoved = true;
        shardReq.creationTime = req.creationTime;
    }

    void afterUnlockEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            unlockEdge(true);
        } else {
            // This can only be because of repeated calls from here: we have the edge locked,
            // and only the CDC does changes.
            // TODO it would be cleaner to verify this with a lookup
            ALWAYS_ASSERT(err == TernError::NO_ERROR || err == TernError::EDGE_NOT_FOUND);
            auto& cdcResp = env.finish().setSoftUnlinkDirectory();
            // Update parent map
            {
                auto k = InodeIdKey::Static(req.targetId);
                ROCKS_DB_CHECKED(env.dbTxn.Delete(env.parentCf, k.toSlice()));
            }
        }
    }

    void rollback(bool repeated = false) {
        auto& shardReq = env.needsShard(SOFT_UNLINK_DIRECTORY_ROLLBACK, req.ownerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.ownerId;
        shardReq.name = req.name;
        shardReq.targetId = req.targetId;
        shardReq.wasMoved = false;
        shardReq.creationTime = req.creationTime;
    }

    void afterRollback(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            rollback(true);
        } else {
            // This can only be because of repeated calls from here: we have the edge locked,
            // and only the CDC does changes.
            // TODO it would be cleaner to verify this with a lookup
            ALWAYS_ASSERT(err == TernError::NO_ERROR || err == TernError::EDGE_NOT_FOUND);
            env.finishWithError(state.exitError());
        }
    }
};

enum RenameDirectoryStep : uint8_t {
    RENAME_DIRECTORY_LOCK_OLD_EDGE = 1,
    RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME = 2,
    RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE = 3,
    RENAME_DIRECTORY_UNLOCK_NEW_EDGE = 4,
    RENAME_DIRECTORY_UNLOCK_OLD_EDGE = 5,
    RENAME_DIRECTORY_SET_OWNER = 6,
    RENAME_DIRECTORY_ROLLBACK = 7,
};

// Steps:
//
// 1. Make sure there's no loop by traversing the parents
// 2. Lock old edge
// 3. Lookup old creation time for the edge
// 4. Create and lock the new edge
// 5. Unlock the new edge
// 6. Unlock and unlink the old edge
// 7. Update the owner of the moved directory to the new directory
//
// If we fail at step 3 or 4, we need to unlock the edge we locked at step 2. Step 5 and 6
// should never fail.
struct RenameDirectoryStateMachine {
    StateMachineEnv& env;
    const RenameDirectoryReq& req;
    RenameDirectoryState state;

    RenameDirectoryStateMachine(StateMachineEnv& env_, const RenameDirectoryReq& req_, RenameDirectoryState state_):
        env(env_), req(req_), state(state_)
    {}

    void resume(const ShardRespContainer* resp) {
        if (env.txnStep == TXN_START) {
            start();
            return;
        }
        if (unlikely(resp == nullptr)) { // we're resuming with no response
            switch (env.txnStep) {
                case RENAME_DIRECTORY_LOCK_OLD_EDGE: lockOldEdge(); break;
                case RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME: lookupOldCreationTime(); break;
                case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: createLockedNewEdge(); break;
                case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: unlockNewEdge(); break;
                case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: unlockOldEdge(); break;
                case RENAME_DIRECTORY_SET_OWNER: setOwner(); break;
                case RENAME_DIRECTORY_ROLLBACK: rollback(); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        } else {
            switch (env.txnStep) {
                case RENAME_DIRECTORY_LOCK_OLD_EDGE: afterLockOldEdge(*resp); break;
                case RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME: afterLookupOldCreationTime(*resp); break;
                case RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE: afterCreateLockedEdge(*resp); break;
                case RENAME_DIRECTORY_UNLOCK_NEW_EDGE: afterUnlockNewEdge(*resp); break;
                case RENAME_DIRECTORY_UNLOCK_OLD_EDGE: afterUnlockOldEdge(*resp); break;
                case RENAME_DIRECTORY_SET_OWNER: afterSetOwner(*resp); break;
                case RENAME_DIRECTORY_ROLLBACK: afterRollback(*resp); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        }
    }

    // Check that changing this parent-child relationship wouldn't create
    // loops in directory structure.
    TernError loopCheck() const {
        std::unordered_set<InodeId> visited;
        InodeId cursor = req.targetId;
        for (;;) {
            if (visited.count(cursor) > 0) {
                LOG_INFO(env.env, "Re-encountered %s in loop check, will return false", cursor);
                return TernError::LOOP_IN_DIRECTORY_RENAME;
            }
            LOG_DEBUG(env.env, "Performing loop check for %s", cursor);
            visited.insert(cursor);
            if (cursor == req.targetId) {
                cursor = req.newOwnerId;
            } else {
                auto k = InodeIdKey::Static(cursor);
                std::string v;
                auto status = env.dbTxn.Get({}, env.parentCf, k.toSlice(), &v);
                if (status == rocksdb::Status::NotFound()) {
                    return TernError::DIRECTORY_NOT_FOUND;
                }
                ROCKS_DB_CHECKED(status);
                cursor = ExternalValue<InodeIdValue>(v)().id();
            }
            if (cursor == ROOT_DIR_INODE_ID) {
                break;
            }
        }
        return TernError::NO_ERROR;
    }

    void start() {
        if (req.targetId.type() != InodeType::DIRECTORY) {
            env.finishWithError(TernError::TYPE_IS_NOT_DIRECTORY);
        } else if (req.oldOwnerId == req.newOwnerId) {
            env.finishWithError(TernError::SAME_DIRECTORIES);
        } else if (auto err = loopCheck(); err != TernError::NO_ERROR) {
            // First, check if we'd create a loop
            env.finishWithError(err);
        } else {
            // Now, actually start by locking the old edge
            lockOldEdge();
        }
    }

    void lockOldEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_DIRECTORY_LOCK_OLD_EDGE, req.oldOwnerId.shard(), repeated).setLockCurrentEdge();
        shardReq.dirId = req.oldOwnerId;
        shardReq.name = req.oldName;
        shardReq.targetId = req.targetId;
        shardReq.creationTime = req.oldCreationTime;
    }

    void afterLockOldEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            lockOldEdge(true); // retry
        } else if (
            err == TernError::DIRECTORY_NOT_FOUND || err == TernError::EDGE_NOT_FOUND || err == TernError::MISMATCHING_CREATION_TIME
        ) {
            if (err == TernError::DIRECTORY_NOT_FOUND) {
                err = TernError::OLD_DIRECTORY_NOT_FOUND;
            }
            env.finishWithError(err);
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            lookupOldCreationTime();
        }
    }

    void lookupOldCreationTime(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_DIRECTORY_LOOKUP_OLD_CREATION_TIME, req.newOwnerId.shard(), repeated).setFullReadDir();
        shardReq.dirId = req.newOwnerId;
        shardReq.flags = FULL_READ_DIR_BACKWARDS | FULL_READ_DIR_SAME_NAME | FULL_READ_DIR_CURRENT;
        shardReq.limit = 1;
        shardReq.startName = req.newName;
        shardReq.startTime = 0; // we have current set
    }

    void afterLookupOldCreationTime(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            lookupOldCreationTime(true); // retry
        } else if (err == TernError::DIRECTORY_NOT_FOUND) {
            // we've failed hard and we need to unlock the old edge.
            state.setExitError(err);
            rollback();
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            // there might be no existing edge
            const auto& fullReadDir = resp.getFullReadDir();
            ALWAYS_ASSERT(fullReadDir.results.els.size() < 2); // we have current and same name
            if (fullReadDir.results.els.size() == 0) {
                state.setNewOldCreationTime(0); // there was nothing present for this name
            } else {
                state.setNewOldCreationTime(fullReadDir.results.els[0].creationTime);
            }
            // keep going
            createLockedNewEdge();
        }
    }

    void createLockedNewEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_DIRECTORY_CREATE_LOCKED_NEW_EDGE, req.newOwnerId.shard(), repeated).setCreateLockedCurrentEdge();
        shardReq.dirId = req.newOwnerId;
        shardReq.name = req.newName;
        shardReq.targetId = req.targetId;
        shardReq.oldCreationTime = state.newOldCreationTime();
    }

    void afterCreateLockedEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (createCurrentLockedEdgeRetry(err)) {
            createLockedNewEdge(true);
        } else if (err == TernError::MISMATCHING_CREATION_TIME) {
            // we need to lookup the creation time again.
            lookupOldCreationTime();
        } else if (err == TernError::CANNOT_OVERRIDE_NAME || err == TernError::DIRECTORY_NOT_FOUND) {
            state.setExitError(err);
            rollback();
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            state.setNewCreationTime(resp.getCreateLockedCurrentEdge().creationTime);
            unlockNewEdge();
        }
    }

    void unlockNewEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_DIRECTORY_UNLOCK_NEW_EDGE, req.newOwnerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.newOwnerId;
        shardReq.name = req.newName;
        shardReq.targetId = req.targetId;
        shardReq.wasMoved = false;
        shardReq.creationTime = state.newCreationTime();
    }

    void afterUnlockNewEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            unlockNewEdge(true);
        } else if (err == TernError::EDGE_NOT_FOUND) {
            // This can only be because of repeated calls from here: we have the edge locked,
            // and only the CDC does changes.
            // TODO it would be cleaner to verify this with a lookup
            unlockOldEdge();
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            unlockOldEdge();
        }
    }

    void unlockOldEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_DIRECTORY_UNLOCK_OLD_EDGE, req.oldOwnerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.oldOwnerId;
        shardReq.name = req.oldName;
        shardReq.targetId = req.targetId;
        shardReq.wasMoved = true;
        shardReq.creationTime = req.oldCreationTime;
    }

    void afterUnlockOldEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            unlockOldEdge(true);
        } else if (err == TernError::EDGE_NOT_FOUND) {
            // This can only be because of repeated calls from here: we have the edge locked,
            // and only the CDC does changes.
            // TODO it would be cleaner to verify this with a lookup
            setOwner();
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            setOwner();
        }
    }

    void setOwner(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_DIRECTORY_SET_OWNER, req.targetId.shard(), repeated).setSetDirectoryOwner();
        shardReq.ownerId = req.newOwnerId;
        shardReq.dirId = req.targetId;
    }

    void afterSetOwner(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            setOwner(true);
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            auto& resp = env.finish().setRenameDirectory();
            resp.creationTime = state.newCreationTime();
            // update cache
            {
                auto k = InodeIdKey::Static(req.targetId);
                auto v = InodeIdValue::Static(req.newOwnerId);
                ROCKS_DB_CHECKED(env.dbTxn.Put(env.parentCf, k.toSlice(), v.toSlice()));
            }
        }
    }

    void rollback(bool repeated = false) {
        auto& shardReq = env.needsShard(RENAME_DIRECTORY_ROLLBACK, req.oldOwnerId.shard(), repeated).setUnlockCurrentEdge();
        shardReq.dirId = req.oldOwnerId;
        shardReq.name = req.oldName;
        shardReq.targetId = req.targetId;
        shardReq.wasMoved = false;
        shardReq.creationTime = req.oldCreationTime;
    }

    void afterRollback(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            rollback(true);
        } else {
            env.finishWithError(state.exitError());
        }
    }
};

enum CrossShardHardUnlinkFileStep : uint8_t {
    CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE = 1,
    CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT = 2,
};

// Steps:
//
// 1. Remove owning edge in one shard
// 2. Make file transient in other shard
//
// Step 2 cannot fail.
struct CrossShardHardUnlinkFileStateMachine {
    StateMachineEnv& env;
    const CrossShardHardUnlinkFileReq& req;
    CrossShardHardUnlinkFileState state;

    CrossShardHardUnlinkFileStateMachine(StateMachineEnv& env_, const CrossShardHardUnlinkFileReq& req_, CrossShardHardUnlinkFileState state_):
        env(env_), req(req_), state(state_)
    {}

    void resume(const ShardRespContainer* resp) {
        if (env.txnStep == TXN_START) {
            start();
            return;
        }
        if (unlikely(resp == nullptr)) { // we're resuming with no response
            switch (env.txnStep) {
                case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: removeEdge(); break;
                case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: makeTransient(); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        } else {
            switch (env.txnStep) {
                case CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE: afterRemoveEdge(*resp); break;
                case CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT: afterMakeTransient(*resp); break;
                default: throw TERN_EXCEPTION("bad step %s", env.txnStep);
            }
        }
    }

    void start() {
        if (req.ownerId.shard() == req.targetId.shard()) {
            env.finishWithError(TernError::SAME_SHARD);
        } else if (req.targetId.type() == InodeType::DIRECTORY) {
            env.finishWithError(TernError::TYPE_IS_DIRECTORY);
        } else {
            removeEdge();
        }
    }

    void removeEdge(bool repeated = false) {
        auto& shardReq = env.needsShard(CROSS_SHARD_HARD_UNLINK_FILE_REMOVE_EDGE, req.ownerId.shard(), repeated).setRemoveOwnedSnapshotFileEdge();
        shardReq.ownerId = req.ownerId;
        shardReq.targetId = req.targetId;
        shardReq.name = req.name;
        shardReq.creationTime = req.creationTime;
    }

    void afterRemoveEdge(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT || err == TernError::MTIME_IS_TOO_RECENT) {
            removeEdge(true);
        } else if (err == TernError::DIRECTORY_NOT_FOUND) {
            env.finishWithError(err);
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR);
            makeTransient();
        }
    }

    void makeTransient(bool repeated = false) {
        auto& shardReq = env.needsShard(CROSS_SHARD_HARD_UNLINK_FILE_MAKE_TRANSIENT, req.targetId.shard(), repeated).setMakeFileTransient();
        shardReq.id = req.targetId;
        shardReq.note = req.name;
    }

    void afterMakeTransient(const ShardRespContainer& resp) {
        auto err = resp.kind() == ShardMessageKind::ERROR ? resp.getError() : TernError::NO_ERROR;
        if (err == TernError::TIMEOUT) {
            makeTransient(true);
        } else {
            ALWAYS_ASSERT(err == TernError::NO_ERROR || err == TernError::FILE_NOT_FOUND);
            env.finish().setCrossShardHardUnlinkFile();
        }
    }
};

void CDCShardResp::pack(BincodeBuf& buf) const {
    buf.packScalar(txnId.x);
    checkPoint.pack(buf);
    resp.pack(buf);
}

void CDCShardResp::unpack(BincodeBuf& buf) {
    txnId.x = buf.unpackScalar<uint64_t>();
    checkPoint.unpack(buf);
    resp.unpack(buf);
}

size_t CDCShardResp::packedSize() const {
    return sizeof(uint64_t) + checkPoint.packedSize() + resp.packedSize();
}

void CDCLogEntry::prepareLogEntries(std::vector<CDCReqContainer>& cdcReqs, std::vector<CDCShardResp>& shardResps, size_t maxPackedSize, std::vector<CDCLogEntry>& entriesOut) {
    size_t usedSize = std::numeric_limits<size_t>::max();
    CDCLogEntry* curEntry{nullptr};
    for (auto& shardResp : shardResps) {
        auto respSize = shardResp.packedSize();
        ALWAYS_ASSERT(respSize < maxPackedSize);
        if (unlikely(maxPackedSize - respSize < usedSize)) {
            curEntry = &entriesOut.emplace_back();
            usedSize = curEntry->packedSize();
        }
        curEntry->_shardResps.emplace_back(std::move(shardResp));
        usedSize += respSize;
        ALWAYS_ASSERT(usedSize <= maxPackedSize);
    }
    for (auto& cdcReq : cdcReqs) {
        auto reqSize = cdcReq.packedSize();
        ALWAYS_ASSERT(reqSize < maxPackedSize);
        if (unlikely(maxPackedSize - reqSize < usedSize)) {
            curEntry = &entriesOut.emplace_back();
            usedSize = curEntry->packedSize();
        }
        curEntry->_cdcReqs.emplace_back(std::move(cdcReq));
        usedSize += reqSize;
        ALWAYS_ASSERT(usedSize <= maxPackedSize);
    }
    cdcReqs.clear();
    shardResps.clear();
}

CDCLogEntry CDCLogEntry::prepareBootstrapEntry() {
    CDCLogEntry entry;
    entry._bootstrapEntry = true;
    return entry;
}

void CDCLogEntry::pack(BincodeBuf& buf) const {
    buf.packScalar<bool>(_bootstrapEntry);
    buf.packScalar<uint32_t>(_cdcReqs.size());
    for (auto& cdcReq : _cdcReqs) {
        cdcReq.pack(buf);
    }
    buf.packScalar<uint32_t>(_shardResps.size());
    for (auto& shardResp : _shardResps) {
        shardResp.pack(buf);
    }
}

void CDCLogEntry::unpack(BincodeBuf& buf) {
    _bootstrapEntry = buf.unpackScalar<bool>();
    _cdcReqs.resize(buf.unpackScalar<uint32_t>());
    for (auto& cdcReq : _cdcReqs) {
        cdcReq.unpack(buf);
    }
    _shardResps.resize(buf.unpackScalar<uint32_t>());
    for (auto& shardResp : _shardResps) {
        shardResp.unpack(buf);
    }
}

size_t CDCLogEntry::packedSize() const {
    size_t size{1 + 2 * sizeof(uint32_t)};
    for (auto& cdcReq : _cdcReqs) {
        size += cdcReq.packedSize();
    }
    for (auto& shardResp : _shardResps) {
        size += shardResp.packedSize();
    }
    return size;
}

struct CDCDBImpl {
    Env _env;

    // The reason why we insist in storing everything in RocksDB is that we can do
    // everything in a single transaction, so it's easier to reason about atomic
    // modifications. _dirsToTxnsCf for example would be much simpler as a
    // simple unordered_map.
    //
    // It also has the nice advantage that we don't need to reconstruct the state
    // when starting up, it's all already there.

    // The general pattern in this file is to use a txn for everything,
    // hence this naming.
    rocksdb::OptimisticTransactionDB* _dbDontUseDirectly;

    rocksdb::ColumnFamilyHandle* _defaultCf;
    rocksdb::ColumnFamilyHandle* _parentCf;
    rocksdb::ColumnFamilyHandle* _enqueuedCf; // V1, txnId -> CDC req, only for executing or waiting to be executed requests
    rocksdb::ColumnFamilyHandle* _executingCf; // V1, txnId -> CDC state machine, for requests that are executing
    // V1, data structure storing a dir to txn ids mapping:
    // InodeId -> txnId -- sentinel telling us what the first txn in line is. If none, zero.
    // we need the sentinel to skip over tombstones quickly.
    // InodeId, txnId set with the queue
    rocksdb::ColumnFamilyHandle* _dirsToTxnsCf;
    // legacy
    rocksdb::ColumnFamilyHandle* _reqQueueCfLegacy; // V0, txnId -> CDC req, for all the requests (including historical)

    AssertiveLock _processLock;

    std::shared_ptr<rocksdb::Statistics> _dbStatistics;
    std::string _dbStatisticsFile;

    // ----------------------------------------------------------------
    // initialization

    CDCDBImpl() = delete;
    CDCDBImpl& operator=(const CDCDBImpl&) = delete;

    CDCDBImpl(Logger& logger, std::shared_ptr<XmonAgent>& xmon, SharedRocksDB& sharedDb) : _env(logger, xmon, "cdc_db") {
        _defaultCf = sharedDb.getCF(rocksdb::kDefaultColumnFamilyName);
        _reqQueueCfLegacy = sharedDb.getCF("reqQueue");
        _parentCf = sharedDb.getCF("parent");
        _enqueuedCf = sharedDb.getCF("enqueued");
        _executingCf = sharedDb.getCF("executing");
        _dirsToTxnsCf = sharedDb.getCF("dirsToTxns");
        _dbDontUseDirectly = sharedDb.transactionDB();

        _initDb();
    }

    // Getting/setting txn ids from our txn ids keys

    #define TXN_ID_SETTER_GETTER(key, getterName, setterName) \
        uint64_t getterName(rocksdb::Transaction& dbTxn) { \
            std::string v; \
            ROCKS_DB_CHECKED(dbTxn.Get({}, _defaultCf, cdcMetadataKey(&key), &v)); \
            ExternalValue<U64Value> txnIdV(v); \
            return txnIdV().u64(); \
        } \
        void setterName(rocksdb::Transaction& dbTxn, uint64_t x) { \
            auto v = U64Value::Static(x); \
            ROCKS_DB_CHECKED(dbTxn.Put(_defaultCf, cdcMetadataKey(&key), v.toSlice())); \
        }

    TXN_ID_SETTER_GETTER(LAST_TXN_KEY, _lastTxn, _setLastTxn)
    TXN_ID_SETTER_GETTER(EXECUTING_TXN_KEY, _executingTxnLegacy, _setExecutingTxnLegacy)

    #undef TXN_ID_SETTER_GETTER

    // returns -1 if the version key was not set
    int64_t _version(rocksdb::Transaction& dbTxn) {
        std::string value;
        auto status = dbTxn.Get({}, _defaultCf, cdcMetadataKey(&VERSION_KEY), &value);
        if (status.IsNotFound()) {
            return -1;
        } else {
            ROCKS_DB_CHECKED(status);
            ExternalValue<U64Value> vV(value);
            return vV().u64();
        }
    }

    void _setVersion(rocksdb::Transaction& dbTxn, uint64_t version) {
        auto v = U64Value::Static(version);
        ROCKS_DB_CHECKED(dbTxn.Put(_defaultCf, cdcMetadataKey(&VERSION_KEY), v.toSlice()));
    }

    void _initDbV0(rocksdb::Transaction& dbTxn) {
        LOG_INFO(_env, "initializing V0 db");
        const auto keyExists = [&dbTxn](rocksdb::ColumnFamilyHandle* cf, const rocksdb::Slice& key) -> bool {
            std::string value;
            auto status = dbTxn.Get({}, cf, key, &value);
            if (status.IsNotFound()) {
                return false;
            } else {
                ROCKS_DB_CHECKED(status);
                return true;
            }
        };

        if (!keyExists(_defaultCf, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY))) {
            LOG_INFO(_env, "initializing next directory id");
            auto id = InodeIdValue::Static(InodeId::FromU64(ROOT_DIR_INODE_ID.u64 + 1));
            ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&NEXT_DIRECTORY_ID_KEY), id.toSlice()));
        }

        const auto initZeroValue = [this, &keyExists, &dbTxn](const std::string& what, const CDCMetadataKey& key) {
            if (!keyExists(_defaultCf, cdcMetadataKey(&key))) {
                LOG_INFO(_env, "initializing %s", what);
                StaticValue<U64Value> v;
                v().setU64(0);
                ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&key), v.toSlice()));
            }
        };

        initZeroValue("last txn", LAST_TXN_KEY);
        initZeroValue("first txn in queue", FIRST_TXN_IN_QUEUE_KEY);
        initZeroValue("last txn in queue", LAST_TXN_IN_QUEUE_KEY);
        initZeroValue("executing txn", EXECUTING_TXN_KEY);
        initZeroValue("last applied log index", LAST_APPLIED_LOG_ENTRY_KEY);
    }

    void _initDbV1(rocksdb::Transaction& dbTxn) {
        LOG_INFO(_env, "initializing V1 db");
        // Pick up the executing txn, if any, and move it to the executing CF.
        // We preserve the executing txn to preserve integrity.
        {
            uint64_t executingTxn = _executingTxnLegacy(dbTxn);
            if (executingTxn != 0) {
                LOG_INFO(_env, "migrating txn %s", executingTxn);
                // _reqQueueCf -> _enqueuedCf
                auto txnK = CDCTxnIdKey::Static(CDCTxnId(executingTxn));
                std::string reqV;
                ROCKS_DB_CHECKED(dbTxn.Get({}, _reqQueueCfLegacy, txnK.toSlice(), &reqV));
                CDCReqContainer req;
                bincodeFromRocksValue(reqV, req);
                ROCKS_DB_CHECKED(dbTxn.Put(_enqueuedCf, txnK.toSlice(), reqV));
                // EXECUTING_TXN_KEY -> _executingCf
                std::string txnStateV;
                ROCKS_DB_CHECKED(dbTxn.Get({}, _defaultCf, cdcMetadataKey(&EXECUTING_TXN_STATE_KEY), &txnStateV));
                ROCKS_DB_CHECKED(dbTxn.Put(_executingCf, txnK.toSlice(), txnStateV));
                // Add to _dirsToTxnsCf, will lock since things are empty
                _addToDirsToTxns(dbTxn, txnK().id(), req);
            }
        }

        // Throw away everything legacy. The clients will just retry.
        ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&FIRST_TXN_IN_QUEUE_KEY)));
        ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&LAST_TXN_IN_QUEUE_KEY)));
        ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&EXECUTING_TXN_KEY)));
        ROCKS_DB_CHECKED(dbTxn.Delete(cdcMetadataKey(&EXECUTING_TXN_STATE_KEY)));
        // We delete the _reqQueueCfLegacy CF outside the transaction, because it's just too expensive
        // to delete here one-by-one.
    }

    void _initDb() {
        rocksdb::WriteOptions options;
        options.sync = true;
        std::unique_ptr<rocksdb::Transaction> dbTxn(_dbDontUseDirectly->BeginTransaction(options));

        if (_version(*dbTxn) == -1) {
            _initDbV0(*dbTxn);
            _setVersion(*dbTxn, 0);
        }
        if (_version(*dbTxn) == 0) {
            _initDbV1(*dbTxn);
            _setVersion(*dbTxn, 1);
        }

        commitTransaction(*dbTxn);

        // This means that it'll be recreated and dropped each time, but that's OK.
        _dbDontUseDirectly->DropColumnFamily(_reqQueueCfLegacy);

        LOG_INFO(_env, "DB initialization done");
    }

    // ----------------------------------------------------------------
    // retrying txns

    void commitTransaction(rocksdb::Transaction& txn) {
        XmonNCAlert alert(10_sec);
        for (;;) {
            auto status = txn.Commit();
            if (likely(status.ok())) {
                _env.clearAlert(alert);
                return;
            }
            if (likely(status.IsTryAgain())) {
                _env.updateAlert(alert, "got try again in CDC transaction, will sleep for a second and try again");
                (1_sec).sleepRetry();
                continue;
            }
            // We don't expect any other kind of error. The docs state:
            //
            //     If this transaction was created by an OptimisticTransactionDB(),
            //     Status::Busy() may be returned if the transaction could not guarantee
            //     that there are no write conflicts.  Status::TryAgain() may be returned
            //     if the memtable history size is not large enough
            //      (See max_write_buffer_size_to_maintain).
            //
            // However we never run transactions concurrently. So we should never get busy.
            //
            // This is just a way to throw the right exception.
            ROCKS_DB_CHECKED(status);
        }
    }

    // Processing
    // ----------------------------------------------------------------

    uint64_t _lastAppliedLogEntryDB() {
        std::string value;
        ROCKS_DB_CHECKED(_dbDontUseDirectly->Get({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value));
        ExternalValue<U64Value> v(value);
        return v().u64();
    }

    uint64_t _lastAppliedLogEntry(rocksdb::Transaction& dbTxn) {
        std::string value;
        ROCKS_DB_CHECKED(dbTxn.Get({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), &value));
        ExternalValue<U64Value> v(value);
        return v().u64();
    }

    void _advanceLastAppliedLogEntry(rocksdb::Transaction& dbTxn, uint64_t index) {
        uint64_t oldIndex = _lastAppliedLogEntry(dbTxn);
        ALWAYS_ASSERT(oldIndex+1 == index, "old index is %s, expected %s, got %s", oldIndex, oldIndex+1, index);
        LOG_DEBUG(_env, "bumping log index from %s to %s", oldIndex, index);
        StaticValue<U64Value> v;
        v().setU64(index);
        ROCKS_DB_CHECKED(dbTxn.Put({}, cdcMetadataKey(&LAST_APPLIED_LOG_ENTRY_KEY), v.toSlice()));
    }

    void _addToDirsToTxns(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) {
        for (const auto dirId: directoriesNeedingLock(req)) {
            LOG_DEBUG(_env, "adding dir %s for txn %s", dirId, txnId);
            {
                // into the set
                StaticValue<DirsToTxnsKey> k;
                k().setDirId(dirId);
                k().setTxnId(txnId);
                ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, k.toSlice(), ""));
            }
            {
                // sentinel, if necessary
                StaticValue<DirsToTxnsKey> k;
                k().setDirId(dirId);
                k().setSentinel();
                std::string v;
                auto status = dbTxn.Get({}, _dirsToTxnsCf, k.toSlice(), &v);
                if (status.IsNotFound()) { // we're the first ones here, add the sentinel
                    auto v = CDCTxnIdValue::Static(txnId);
                    ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, k.toSlice(), v.toSlice()));
                } else {
                    ROCKS_DB_CHECKED(status);
                }
            }
        }
    }

    // Returns the txn ids that might be free to work now. Note that we don't
    // know that for sure because they might not hold locks for all dirs. This
    // function does not check that.
    void _removeFromDirsToTxns(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, std::vector<CDCTxnId>& mightBeReady) {
        for (const auto dirId: directoriesNeedingLock(req)) {
            LOG_DEBUG(_env, "removing dir %s for txn %s", dirId, txnId);
            StaticValue<DirsToTxnsKey> k;
            k().setDirId(dirId);
            k().setTxnId(txnId);
            // Check if there's a next key in line. We know that we won't
            // have to step over many deleted keys here because we go through the
            // list in order, and we seek from the list element we've just deleted.
            // It is however important to set the iteration upper bound as of to
            // not spill over and possibly trip over deleted keys.
            StaticValue<DirsToTxnsKey> upperBoundK;
            upperBoundK().setDirId(dirId);
            upperBoundK().setTxnId(CDCTxnId(~(uint64_t)0));
            rocksdb::Slice upperBoundSlice = upperBoundK.toSlice();
            rocksdb::ReadOptions itOptions;
            itOptions.iterate_upper_bound = &upperBoundSlice;
            std::unique_ptr<rocksdb::Iterator> it(dbTxn.GetIterator(itOptions, _dirsToTxnsCf));
            it->Seek(k.toSlice());
            // we must find the key here -- we're removing it.
            ALWAYS_ASSERT(it->Valid());
            {
                // Additional safety check: the key is what we expect.
                auto foundKey = ExternalValue<DirsToTxnsKey>::FromSlice(it->key());
                ALWAYS_ASSERT(!foundKey().isSentinel() && foundKey().txnId() == txnId);
            }
            // now that we've done our checks, we can remove the key
            ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, k.toSlice()));
            // then we look for the next one, if there's anything,
            // and overwrite/delete the sentinel
            StaticValue<DirsToTxnsKey> sentinelK;
            sentinelK().setDirId(dirId);
            sentinelK().setSentinel();
            it->Next();
            if (it->Valid()) { // there's something, set the sentinel
                auto nextK = ExternalValue<DirsToTxnsKey>::FromSlice(it->key());
                auto sentinelV = CDCTxnIdValue::Static(nextK().txnId());
                LOG_DEBUG(_env, "selected %s as next in line after finishing %s", nextK().txnId(), txnId);
                mightBeReady.emplace_back(nextK().txnId());
                ROCKS_DB_CHECKED(dbTxn.Put(_dirsToTxnsCf, sentinelK.toSlice(), sentinelV.toSlice()));
            } else { // we were the last ones here, remove sentinel
                ROCKS_DB_CHECKED(it->status());
                ROCKS_DB_CHECKED(dbTxn.Delete(_dirsToTxnsCf, sentinelK.toSlice()));
            }
        }
    }

    // Check if we have a lock on all the directories that matter to the txn id.
    // It is assumed that the txnId in question is already in _dirsToTxnsCf.
    bool _isReadyToGo(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) {
        for (const auto dirId: directoriesNeedingLock(req)) {
            // the sentinel _must_ be present -- at the very least us!
            StaticValue<DirsToTxnsKey> k;
            k().setDirId(dirId);
            k().setSentinel();
            std::string v;
            ROCKS_DB_CHECKED(dbTxn.Get({}, _dirsToTxnsCf, k.toSlice(), &v));
            ExternalValue<CDCTxnIdValue> otherTxnId(v);
            if (otherTxnId().id() != txnId) {
                return false;
            }
        }
        return true;
    }

    // Adds a request to the enqueued requests. Also adds it to dirsToTxns, which will implicitly
    // acquire locks.
    void _addToEnqueued(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req) {
        {
            auto k = CDCTxnIdKey::Static(txnId);
            std::string v = bincodeToRocksValue(req);
            ROCKS_DB_CHECKED(dbTxn.Put(_enqueuedCf, k.toSlice(), v));
        }
        _addToDirsToTxns(dbTxn, txnId, req);
    }

    // Moves the state forward, filling in `step` appropriatedly, and writing
    // out the updated state.
    template<template<typename> typename V>
    void _advance(
        rocksdb::Transaction& dbTxn,
        CDCTxnId txnId,
        const CDCReqContainer& req,
        const ShardRespContainer* shardResp,
        V<TxnState>& state,
        CDCStep& step,
        // to collect things that might be able to start now because
        // we've finished doing other stuff
        std::vector<CDCTxnId>& txnIds
    ) {
        LOG_DEBUG(_env, "advancing txn %s with state %s", txnId, state());
        StateMachineEnv sm(_env, _defaultCf, _parentCf, dbTxn, txnId, state().step(), step);
        switch (req.kind()) {
        case CDCMessageKind::MAKE_DIRECTORY:
            MakeDirectoryStateMachine(sm, req.getMakeDirectory(), state().getMakeDirectory()).resume(shardResp);
            break;
        case CDCMessageKind::HARD_UNLINK_DIRECTORY:
            HardUnlinkDirectoryStateMachine(sm, req.getHardUnlinkDirectory(), state().getHardUnlinkDirectory()).resume(shardResp);
            break;
        case CDCMessageKind::RENAME_FILE:
            RenameFileStateMachine(sm, req.getRenameFile(), state().getRenameFile()).resume(shardResp);
            break;
        case CDCMessageKind::SOFT_UNLINK_DIRECTORY:
            SoftUnlinkDirectoryStateMachine(sm, req.getSoftUnlinkDirectory(), state().getSoftUnlinkDirectory()).resume(shardResp);
            break;
        case CDCMessageKind::RENAME_DIRECTORY:
            RenameDirectoryStateMachine(sm, req.getRenameDirectory(), state().getRenameDirectory()).resume(shardResp);
            break;
        case CDCMessageKind::CROSS_SHARD_HARD_UNLINK_FILE:
            CrossShardHardUnlinkFileStateMachine(sm, req.getCrossShardHardUnlinkFile(), state().getCrossShardHardUnlinkFile()).resume(shardResp);
            break;
        default:
            throw TERN_EXCEPTION("bad cdc message kind %s", req.kind());
        }
        state().setStep(sm.txnStep);

        if (sm.finished) {
            // we finished immediately
            LOG_DEBUG(_env, "txn %s with req %s finished", txnId, req);
            _finishExecuting(dbTxn, txnId, req, txnIds);
        } else {
            // we still have something to do, persist
            _setExecuting(dbTxn, txnId, state);
        }
    }

    bool _isExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId) {
        auto k = CDCTxnIdKey::Static(txnId);
        std::string v;
        auto status = dbTxn.Get({}, _executingCf, k.toSlice(), &v);
        if (status.IsNotFound()) {
            return false;
        }
        ROCKS_DB_CHECKED(status);
        return true;
    }

    template<template<typename> typename V>
    void _setExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId, V<TxnState>& state) {
        auto k = CDCTxnIdKey::Static(txnId);
        ROCKS_DB_CHECKED(dbTxn.Put(_executingCf, k.toSlice(), state.toSlice()));
    }

    void _finishExecuting(rocksdb::Transaction& dbTxn, CDCTxnId txnId, const CDCReqContainer& req, std::vector<CDCTxnId>& txnIds) {
        {
            // delete from _executingCf
            auto k = CDCTxnIdKey::Static(txnId);
            ROCKS_DB_CHECKED(dbTxn.Delete(_executingCf, k.toSlice()));
        }
        // delete from dirsToTxnIds
        _removeFromDirsToTxns(dbTxn, txnId, req, txnIds);
    }

    // Starts executing the given transactions, if possible. If it managed
    // to start something, it immediately advances it as well (no point delaying
    // that).
    //
    // It modifies `txnIds` with new transactions we looked at if we immediately
    // finish executing txns that we start here.
    void _startExecuting(rocksdb::Transaction& dbTxn, std::vector<CDCTxnId>& txnIds, CDCStep& step) {
        CDCReqContainer req;
        for (int i = 0; i < txnIds.size(); i++) {
            CDCTxnId txnId = txnIds[i];
            auto reqK = CDCTxnIdKey::Static(txnId);
            std::string reqV;
            ROCKS_DB_CHECKED(dbTxn.Get({}, _enqueuedCf, reqK.toSlice(), &reqV));
            bincodeFromRocksValue(reqV, req);
            if (!_isExecuting(dbTxn, txnId)) {
                if (_isReadyToGo(dbTxn, txnId, req)) {
                    LOG_DEBUG(_env, "starting to execute txn %s with req %s, since it is ready to go and not executing already", txnId, req);
                    StaticValue<TxnState> txnState;
                    txnState().start(req.kind());
                    _setExecuting(dbTxn, txnId, txnState);
                    _advance(dbTxn, txnId, req, nullptr, txnState, step, txnIds);
                } else {
                    LOG_DEBUG(_env, "waiting before executing txn %s with req %s, since it is not ready to go", txnId, req);
                }
            }
        }
    }

    void _enqueueCDCReqs(
        rocksdb::Transaction& dbTxn,
        const std::vector<CDCReqContainer>& reqs,
        CDCStep& step,
        // we need two lists because one (`cdcReqsTxnIds`) is specifically
        // to return to the user, while the other is used for other purposes,
        // too.
        std::vector<CDCTxnId>& txnIdsToStart,
        std::vector<CDCTxnId>& cdcReqsTxnIds
    ) {
        for (const auto& req: reqs) {
            uint64_t txnId;
            {
                // Generate new txn id
                txnId = _lastTxn(dbTxn) + 1;
                _setLastTxn(dbTxn, txnId);
                // Push to queue
                _addToEnqueued(dbTxn, txnId, req);
                LOG_DEBUG(_env, "enqueued CDC req %s with txn id %s", req.kind(), txnId);
            }

            txnIdsToStart.emplace_back(txnId);
            cdcReqsTxnIds.emplace_back(txnId);
        }
    }

    void _advanceWithResp(
        rocksdb::Transaction& dbTxn,
        CDCTxnId txnId,
        const ShardRespContainer* resp,
        CDCStep& step,
        std::vector<CDCTxnId>& txnIdsToStart
    ) {
        auto txnIdK = CDCTxnIdKey::Static(txnId);

        // Get the req
        CDCReqContainer cdcReq;
        {
            std::string reqV;
            ROCKS_DB_CHECKED(dbTxn.Get({}, _enqueuedCf, txnIdK.toSlice(), &reqV));
            bincodeFromRocksValue(reqV, cdcReq);
        }

        // Get the state
        std::string txnStateV;
        ROCKS_DB_CHECKED(dbTxn.Get({}, _executingCf, txnIdK.toSlice(), &txnStateV));
        ExternalValue<TxnState> txnState(txnStateV);

        // Advance with response
        _advance(dbTxn, txnId, cdcReq, resp, txnState, step, txnIdsToStart);
    }

    void update(
        bool sync,
        uint64_t logIndex,
        const std::vector<CDCReqContainer>& cdcReqs,
        const std::vector<CDCShardResp>& shardResps,
        CDCStep& step,
        std::vector<CDCTxnId>& cdcReqsTxnIds
    ) {
        auto locked = _processLock.lock();

        rocksdb::WriteOptions options;
        options.sync = sync;
        std::unique_ptr<rocksdb::Transaction> dbTxn(_dbDontUseDirectly->BeginTransaction(options));

        _advanceLastAppliedLogEntry(*dbTxn, logIndex);

        step.clear();
        cdcReqsTxnIds.clear();

        {
            std::vector<CDCTxnId> txnIdsToStart;
            _enqueueCDCReqs(*dbTxn, cdcReqs, step, txnIdsToStart, cdcReqsTxnIds);
            for (const auto& resp: shardResps) {
                _advanceWithResp(*dbTxn, resp.txnId, &resp.resp, step, txnIdsToStart);
            }
            _startExecuting(*dbTxn, txnIdsToStart, step);
        }

        commitTransaction(*dbTxn);
    }

    void bootstrap(
        bool sync,
        uint64_t logIndex,
        CDCStep& step
    ) {
        auto locked = _processLock.lock();

        rocksdb::WriteOptions options;
        options.sync = sync;
        std::unique_ptr<rocksdb::Transaction> dbTxn(_dbDontUseDirectly->BeginTransaction(options));

        step.clear();

        _advanceLastAppliedLogEntry(*dbTxn, logIndex);

        std::vector<CDCTxnId> txnIdsToStart;
        // Just collect all executing txns, and run them
        std::unique_ptr<rocksdb::Iterator> it(dbTxn->GetIterator({}, _executingCf));
        for (it->SeekToFirst(); it->Valid(); it->Next()) {
            auto txnIdK = ExternalValue<CDCTxnIdKey>::FromSlice(it->key());
            _advanceWithResp(*dbTxn, txnIdK().id(), nullptr, step, txnIdsToStart);
        }
        ROCKS_DB_CHECKED(it->status());

        _startExecuting(*dbTxn, txnIdsToStart, step);

        commitTransaction(*dbTxn);
    }
};

CDCDB::CDCDB(Logger& logger, std::shared_ptr<XmonAgent>& xmon, SharedRocksDB& sharedDb) {
    _impl = new CDCDBImpl(logger, xmon, sharedDb);
}

CDCDB::~CDCDB() {
    delete ((CDCDBImpl*)_impl);
}

void CDCDB::applyLogEntry(bool sync, const CDCLogEntry& entry, CDCStep& step, std::vector<CDCTxnId>& cdcReqsTxnIds) {
    if (unlikely(entry.bootstrapEntry())) {
        ((CDCDBImpl*)_impl)->bootstrap(sync, entry.logIdx(), step);
    } else {
        ((CDCDBImpl*)_impl)->update(sync, entry.logIdx(), entry.cdcReqs(), entry.shardResps(), step, cdcReqsTxnIds);
    }
}

uint64_t CDCDB::lastAppliedLogEntry() {
    return ((CDCDBImpl*)_impl)->_lastAppliedLogEntryDB();
}
